agentmux_srv\backend\blockcontroller/
subprocess.rs

1// Copyright 2025, AgentMux Corp.
2// SPDX-License-Identifier: Apache-2.0
3
4//! SubprocessController: manages agent CLI as stateless per-turn subprocess invocations.
5//!
6//! Architecture:
7//!   Each user message spawns a fresh `claude -p` process.
8//!   Multi-turn continuity uses `--resume <session-id>`.
9//!   The process reads one JSON message from stdin, runs the agentic loop,
10//!   streams NDJSON on stdout, then exits.
11//!
12//! State machine:
13//!   INIT ─(spawn)─> RUNNING ─(process exits)─> DONE
14//!   DONE ─(new message)─> RUNNING (re-spawn with --resume)
15//!
16//! I/O model (2 async tasks per turn):
17//! 1. stdout_reader: piped stdout → .jsonl persistence + WPS blockfile events on "output" subject
18//! 2. process_waiter: wait for exit, update status, publish lifecycle event
19
20
21use std::collections::{HashMap, VecDeque};
22use std::sync::atomic::{AtomicBool, Ordering};
23use std::sync::{Arc, Mutex};
24
25use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
26
27
28use super::{
29    BlockControllerRuntimeStatus, BlockInputUnion, Controller, STATUS_DONE, STATUS_INIT,
30    STATUS_RUNNING,
31};
32use super::health::{classify_output_line, HealthMonitor};
33use crate::backend::eventbus::EventBus;
34use crate::backend::storage::filestore::FileStore;
35use crate::backend::storage::wstore::WaveStore;
36use crate::backend::wps;
37
38/// WPS file subject name for subprocess output (replaces "term" from PTY).
39pub const SUBPROCESS_OUTPUT_SUBJECT: &str = "output";
40
41/// Controller type constant.
42pub const BLOCK_CONTROLLER_SUBPROCESS: &str = "subprocess";
43
44/// Configuration for spawning a subprocess turn.
45#[derive(Debug, Clone)]
46pub struct SubprocessSpawnConfig {
47    /// CLI executable (e.g., "claude").
48    pub cli_command: String,
49    /// CLI arguments (e.g., ["-p", "--output-format", "stream-json", ...]).
50    pub cli_args: Vec<String>,
51    /// Working directory for the subprocess.
52    pub working_dir: String,
53    /// Environment variables to set.
54    pub env_vars: HashMap<String, String>,
55    /// The user's JSON message to write to stdin.
56    pub message: String,
57    /// Flag used to resume a previous session, e.g. "--resume" (Claude), "-r" (Gemini).
58    /// Empty string means this provider does not support simple-flag resume.
59    pub resume_flag: String,
60    /// JSON field name in the CLI's init event that contains the session/thread ID.
61    /// e.g. "session_id" (Claude/Gemini) or "thread_id" (Codex).
62    pub session_id_field: String,
63    /// Optional client-supplied message id. Echoed back via the
64    /// `agent-message-accepted` event when this config transitions from
65    /// queued → running so the frontend can pair the event with a
66    /// pending `PendingMessage`. None means no feedback is emitted.
67    pub message_id: Option<String>,
68    /// Session id to hydrate `inner.session_id` with BEFORE the first
69    /// turn — used by the picker "My Agents" reattach path. The
70    /// caller reads this from `block.meta["agent:sessionid"]` which
71    /// the frontend pre-populates from the prior block's session id
72    /// when launching with `continueOfInstanceId`. Without this
73    /// hydration `spawn_turn` would only see the captured session id
74    /// AFTER the first turn — meaning the first turn always launches
75    /// the CLI fresh (no `--resume <sid>`) and starts a new
76    /// conversation that re-injects the startup context.
77    ///
78    /// Empty / `None` means "no prior session" (greenfield launch).
79    ///
80    /// Best-effort, not authoritative: if the hydrated id is stale,
81    /// the CLI's stdout-emitted session id always overwrites it at
82    /// capture time (see `spawn_turn`'s stdout-reader block). The
83    /// reattach turn passes the (possibly stale) hydrated id via
84    /// `--resume`; the CLI either accepts it or starts a new
85    /// session and emits its own id, which then becomes the in-
86    /// memory authority for every subsequent turn.
87    pub session_id: Option<String>,
88}
89
90/// Inner state protected by mutex.
91struct SubprocessControllerInner {
92    /// Current process status.
93    proc_status: String,
94    /// Process exit code from the most recent turn.
95    proc_exit_code: i32,
96    /// Status version counter (incremented on each change).
97    status_version: i32,
98    /// Session ID captured from the first `system/init` message.
99    session_id: Option<String>,
100    /// PID of the currently running subprocess (None if idle).
101    current_pid: Option<u32>,
102    /// Handle to kill the current subprocess.
103    kill_tx: Option<tokio::sync::oneshot::Sender<bool>>,
104    /// Messages queued while a turn is in progress.
105    /// Drained sequentially after the current turn exits.
106    pending_messages: VecDeque<SubprocessSpawnConfig>,
107}
108
109/// SubprocessController manages per-turn subprocess lifecycle for agent blocks.
110///
111/// Unlike `ShellController` which maintains a long-running PTY process,
112/// `SubprocessController` spawns a fresh process for each user turn.
113/// Multi-turn continuity comes from `--resume <session-id>`.
114pub struct SubprocessController {
115    /// Parent tab UUID.
116    #[allow(dead_code)]
117    tab_id: String,
118    /// Block UUID.
119    block_id: String,
120    /// Prevents concurrent spawns.
121    run_lock: Arc<AtomicBool>,
122    /// Protected inner state.
123    inner: Arc<Mutex<SubprocessControllerInner>>,
124    /// WPS broker for publishing events (blockfile, controllerstatus).
125    broker: Option<Arc<wps::Broker>>,
126    /// Event bus for obj:update broadcasts.
127    event_bus: Option<Arc<EventBus>>,
128    /// Wave object store for block metadata persistence.
129    wstore: Option<Arc<WaveStore>>,
130    /// FileStore for write-through persistence of output lines (Phase 1.3).
131    filestore: Option<Arc<FileStore>>,
132    /// Agent health monitor (output activity + error tracking).
133    health_monitor: Arc<HealthMonitor>,
134    /// Weak self-reference for queue drain. Set by `set_self_ref` after
135    /// the controller is wrapped in Arc.
136    self_ref: Mutex<Option<std::sync::Weak<Self>>>,
137}
138
139impl SubprocessController {
140    /// Create a new SubprocessController.
141    pub fn new(
142        tab_id: String,
143        block_id: String,
144        broker: Option<Arc<wps::Broker>>,
145        event_bus: Option<Arc<EventBus>>,
146        wstore: Option<Arc<WaveStore>>,
147        filestore: Option<Arc<FileStore>>,
148    ) -> Self {
149        let health_monitor = Arc::new(HealthMonitor::new(
150            block_id.clone(),
151            broker.clone(),
152        ));
153        Self {
154            tab_id,
155            block_id,
156            run_lock: Arc::new(AtomicBool::new(false)),
157            inner: Arc::new(Mutex::new(SubprocessControllerInner {
158                proc_status: STATUS_INIT.to_string(),
159                proc_exit_code: 0,
160                status_version: 0,
161                session_id: None,
162                current_pid: None,
163                kill_tx: None,
164                pending_messages: VecDeque::new(),
165            })),
166            broker,
167            event_bus,
168            wstore,
169            filestore,
170            health_monitor,
171            self_ref: Mutex::new(None),
172        }
173    }
174
175    /// Store a weak self-reference so the process_waiter can drain queued
176    /// messages by calling spawn_turn after the current turn exits.
177    /// Must be called after wrapping in Arc.
178    pub fn set_self_ref(self: &Arc<Self>) {
179        *self.self_ref.lock().unwrap() = Some(Arc::downgrade(self));
180    }
181
182    /// Try to acquire the run lock. Returns false if a turn is already in progress.
183    fn try_lock_run(&self) -> bool {
184        self.run_lock
185            .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
186            .is_ok()
187    }
188
189    /// Release the run lock.
190    fn unlock_run(&self) {
191        self.run_lock.store(false, Ordering::SeqCst);
192    }
193
194    /// Update process status and increment version (must hold inner lock).
195    fn set_status(inner: &mut SubprocessControllerInner, status: &str) {
196        inner.proc_status = status.to_string();
197        inner.status_version += 1;
198    }
199
200    /// Get the runtime status (snapshot).
201    fn get_status_snapshot(&self) -> BlockControllerRuntimeStatus {
202        let inner = self.inner.lock().unwrap();
203        BlockControllerRuntimeStatus {
204            blockid: self.block_id.clone(),
205            version: inner.status_version,
206            shellprocstatus: inner.proc_status.clone(),
207            shellprocconnname: "local".to_string(),
208            shellprocexitcode: inner.proc_exit_code,
209            spawn_ts_ms: None,
210            is_agent_pane: false,
211        }
212    }
213
214    /// Publish current controller status via the WPS broker.
215    fn publish_status(&self) {
216        if let Some(ref broker) = self.broker {
217            let status = self.get_status_snapshot();
218            super::publish_controller_status(broker, &status);
219        }
220    }
221
222    /// Emit `agent-message-accepted` for the given config, if it carries
223    /// a `message_id`. Called from both `spawn_turn` (direct path) and
224    /// the `process_waiter` drain site (queue path). No-op if the config
225    /// has no id, or the broker isn't configured.
226    fn emit_message_accepted(&self, config: &SubprocessSpawnConfig) {
227        let Some(id) = config.message_id.as_deref() else { return };
228        let Some(ref broker) = self.broker else { return };
229        let event = super::super::wps::WaveEvent {
230            event: super::super::wps::EVENT_AGENT_MESSAGE_ACCEPTED.to_string(),
231            scopes: vec![format!("block:{}", self.block_id)],
232            sender: String::new(),
233            persist: 0,
234            data: Some(serde_json::json!({
235                "block_id": self.block_id,
236                "message_id": id,
237            })),
238        };
239        broker.publish(event);
240        tracing::info!(
241            block_id = %self.block_id,
242            message_id = %id,
243            "emitted agent-message-accepted"
244        );
245    }
246
247    /// Get the stored session ID (if any).
248    #[allow(dead_code)]
249    pub fn session_id(&self) -> Option<String> {
250        self.inner.lock().unwrap().session_id.clone()
251    }
252
253    /// Record an authoritative session id captured from the CLI's
254    /// stdout init/`thread.started` event. The CLI is the source of
255    /// truth for which session is live, so this ALWAYS overwrites
256    /// any prior value of `inner.session_id` — including values
257    /// previously hydrated from config on a picker reattach (which
258    /// may be stale by the time the CLI speaks).
259    ///
260    /// Free-function form (taking `&Arc<Mutex<…Inner>>` instead of
261    /// `&self`) so the spawn_turn stdout-reader tokio task can call
262    /// it without holding an `Arc<Self>` reference. The
263    /// `&SubprocessController` method below just delegates.
264    ///
265    /// Returns `true` when the value changed (caller should
266    /// broadcast the meta update + persist to block meta). Returns
267    /// `false` when the new id matches the current one — common
268    /// when the CLI emits the same `session_id` on every NDJSON
269    /// frame within a single turn.
270    pub(crate) fn record_captured_session_id_inner(
271        inner: &Mutex<SubprocessControllerInner>,
272        sid: &str,
273    ) -> bool {
274        if sid.is_empty() {
275            return false;
276        }
277        let mut guard = inner.lock().unwrap();
278        let differs = guard.session_id.as_deref() != Some(sid);
279        if differs {
280            guard.session_id = Some(sid.to_string());
281        }
282        differs
283    }
284
285    /// `&self` convenience wrapper around
286    /// `record_captured_session_id_inner` — used by tests that
287    /// already hold a `SubprocessController`.
288    #[cfg(test)]
289    pub(crate) fn record_captured_session_id(&self, sid: &str) -> bool {
290        Self::record_captured_session_id_inner(&self.inner, sid)
291    }
292
293    /// Hydrate `inner.session_id` from a config-supplied id when the
294    /// controller hasn't seen a value yet.
295    ///
296    /// Picker reattach path: a fresh `SubprocessController` is
297    /// registered for the new block, so its `inner.session_id` is
298    /// `None`. The frontend persisted the prior block's session id
299    /// into `agent:sessionid` meta, the websocket / app_api caller
300    /// read it into `SubprocessSpawnConfig::session_id`, and this
301    /// method copies it to inner so the spawn_turn args-builder
302    /// appends `--resume <sid>` on the FIRST turn.
303    ///
304    /// **Hydration is best-effort, not authoritative.** If
305    /// `inner.session_id` is already `Some` we no-op (don't overwrite
306    /// a value already in place — could be a captured-from-stdout
307    /// id from an earlier turn, or a prior hydration on the same
308    /// reattach). Critically, the **CLI's stdout-emitted session id
309    /// is authoritative** and overwrites any prior value at capture
310    /// time (see the stdout-reader block in `spawn_turn`). So if the
311    /// hydrated value is stale, the FIRST turn passes the stale id
312    /// via `--resume` (likely accepted as a no-op or rejected with a
313    /// "no such session" error from the CLI), the CLI then emits its
314    /// own session id in the init event, and `inner.session_id` is
315    /// overwritten with that authoritative value for subsequent
316    /// turns. Without the capture overwrite, a stale hydrated id
317    /// would be re-used forever — that was the bug codex flagged on
318    /// PR #1018 first cut.
319    ///
320    /// Empty `&str` is treated as "no value" so the caller can use
321    /// it unconditionally without filtering.
322    pub(crate) fn hydrate_session_id_from_config(&self, config_sid: Option<&str>) {
323        let Some(sid) = config_sid.filter(|s| !s.is_empty()) else {
324            return;
325        };
326        let mut inner = self.inner.lock().unwrap();
327        if inner.session_id.is_some() {
328            return;
329        }
330        tracing::info!(
331            block_id = %self.block_id,
332            session_id = %sid,
333            "hydrated session_id from config (picker reattach)"
334        );
335        inner.session_id = Some(sid.to_string());
336    }
337
338    /// Spawn a single turn of the agent CLI.
339    ///
340    /// This is the core method — it spawns `claude -p`, writes the user message to stdin,
341    /// reads NDJSON from stdout (publishing WPS events), and waits for exit.
342    ///
343    /// If a session_id exists from a previous turn, `--resume <sid>` is appended to args.
344    pub fn spawn_turn(&self, config: SubprocessSpawnConfig) -> Result<(), String> {
345        if !self.try_lock_run() {
346            // Turn in progress — queue the message for after it exits.
347            let mut inner = self.inner.lock().unwrap();
348            tracing::info!(
349                block_id = %self.block_id,
350                queue_depth = inner.pending_messages.len() + 1,
351                "subprocess busy — message queued"
352            );
353            inner.pending_messages.push_back(config);
354            return Ok(());
355        }
356
357        // Direct-spawn path (queue was empty): emit the accepted event
358        // now so the frontend can promote its pending entry. The
359        // drain-from-queue path (in process_waiter) emits the same
360        // event just before calling spawn_turn recursively.
361        self.emit_message_accepted(&config);
362
363        // Hydrate inner.session_id from the config-supplied id if the
364        // controller hasn't captured one yet. See
365        // `hydrate_session_id_from_config` for the full rationale.
366        self.hydrate_session_id_from_config(config.session_id.as_deref());
367
368        // Build CLI args, appending resume flag + session_id if we have one and the provider supports it
369        let mut args = config.cli_args.clone();
370        {
371            let inner = self.inner.lock().unwrap();
372            if let Some(ref sid) = inner.session_id {
373                if !config.resume_flag.is_empty() {
374                    args.push(config.resume_flag.clone());
375                    args.push(sid.clone());
376                }
377            }
378        }
379
380        // Update status to running
381        {
382            let mut inner = self.inner.lock().unwrap();
383            Self::set_status(&mut inner, STATUS_RUNNING);
384        }
385        self.publish_status();
386        self.health_monitor.set_active_turn(true);
387
388        // Build command — on Windows, .cmd batch wrappers can't be reliably spawned
389        // via cmd.exe /C with piped stdio. Resolve to node <script> instead.
390        let mut cmd = crate::server::cli_handlers::make_cli_cmd(&config.cli_command);
391        cmd.args(&args);
392
393        // On Windows: suppress console-window allocation. Without CREATE_NO_WINDOW,
394        // node.exe spawned from a windowless sidecar may try to create/attach to a
395        // console, causing stdout to go to that console rather than the pipe.
396        #[cfg(windows)]
397        {
398            const CREATE_NO_WINDOW: u32 = 0x0800_0000;
399            cmd.creation_flags(CREATE_NO_WINDOW);
400        }
401        if !config.working_dir.is_empty() {
402            // Expand ~ to home directory (cross-platform)
403            let expanded_dir = if config.working_dir.starts_with("~/") || config.working_dir == "~" {
404                if let Some(home) = dirs::home_dir() {
405                    home.join(config.working_dir.trim_start_matches("~/")).to_string_lossy().to_string()
406                } else {
407                    config.working_dir.clone()
408                }
409            } else {
410                config.working_dir.clone()
411            };
412            // Create directory if it doesn't exist
413            let dir_path = std::path::Path::new(&expanded_dir);
414            if !dir_path.exists() {
415                if let Err(e) = std::fs::create_dir_all(dir_path) {
416                    tracing::warn!(
417                        block_id = %self.block_id,
418                        dir = %expanded_dir,
419                        error = %e,
420                        "failed to create working directory, using current dir"
421                    );
422                } else {
423                    tracing::info!(
424                        block_id = %self.block_id,
425                        dir = %expanded_dir,
426                        "created working directory"
427                    );
428                }
429            }
430            if dir_path.exists() {
431                cmd.current_dir(&expanded_dir);
432            }
433        }
434        for (k, v) in &config.env_vars {
435            let expanded = crate::backend::base::expand_home_dir_safe(v);
436            cmd.env(k, expanded.to_string_lossy().as_ref());
437        }
438        cmd.stdin(std::process::Stdio::piped());
439        cmd.stdout(std::process::Stdio::piped());
440        cmd.stderr(std::process::Stdio::piped());
441
442        // Spawn
443        let mut child = cmd.spawn().map_err(|e| {
444            let mut inner = self.inner.lock().unwrap();
445            Self::set_status(&mut inner, STATUS_DONE);
446            inner.proc_exit_code = -1;
447            self.unlock_run();
448            format!("failed to spawn subprocess: {e}")
449        })?;
450
451        let pid = child.id().unwrap_or(0);
452        tracing::info!(
453            block_id = %self.block_id,
454            pid = pid,
455            cmd = %config.cli_command,
456            args = ?args,
457            "subprocess spawned"
458        );
459
460        // Assign the child to this block's process tracker so every
461        // descendant it spawns (bg bash, dev servers, watchers, etc.)
462        // is caught by the per-platform tracking mechanism and surfaces
463        // in the swarm activity panel. No-op if the tracker global
464        // hasn't been initialized (tests) or on platforms without a
465        // real tracker impl yet (stub handle accepts silently).
466        // See `backend::process_tracker`.
467        if pid != 0 {
468            if let Some(registry) = crate::backend::process_tracker::registry::global() {
469                let tracker = registry.ensure_tracker(&self.block_id);
470                if let Err(e) = tracker.assign_process(pid) {
471                    tracing::warn!(
472                        block_id = %self.block_id,
473                        pid = pid,
474                        err = %e,
475                        "[process-tracker] assign_process failed"
476                    );
477                }
478            }
479        }
480
481        // Store PID
482        let (kill_tx, kill_rx) = tokio::sync::oneshot::channel::<bool>();
483        {
484            let mut inner = self.inner.lock().unwrap();
485            inner.current_pid = Some(pid);
486            inner.kill_tx = Some(kill_tx);
487        }
488
489        // Take ownership of stdin/stdout
490        let stdin = child.stdin.take().unwrap();
491        let stdout = child.stdout.take().unwrap();
492        let stderr = child.stderr.take().unwrap();
493
494        // Write user message to stdin, then close it.
495        // CRITICAL: This must complete BEFORE the child's stdin timeout
496        // (Claude CLI: 3s). Using std::thread + synchronous write to
497        // bypass the Tokio task scheduler — a tokio::spawn'd task may
498        // not run for seconds on a busy runtime, causing the child to
499        // time out with "no stdin data received in 3s".
500        let message = config.message;
501        let block_id_stdin = self.block_id.clone();
502        {
503            // Convert Tokio's async ChildStdin to a raw OS handle, then
504            // wrap in a std::fs::File for synchronous write. The pipe
505            // buffer (4-64KB on Windows) easily fits our message, so
506            // write_all returns instantly without blocking.
507            #[cfg(unix)]
508            let raw_handle = {
509                use std::os::unix::io::{AsRawFd, FromRawFd};
510                let fd = stdin.as_raw_fd();
511                unsafe { std::fs::File::from_raw_fd(fd) }
512            };
513            #[cfg(windows)]
514            let raw_handle = {
515                use std::os::windows::io::{AsRawHandle, FromRawHandle};
516                let handle = stdin.as_raw_handle();
517                unsafe { std::fs::File::from_raw_handle(handle) }
518            };
519
520            // Spawn a real OS thread (not a Tokio task) for the write.
521            // This ensures it runs immediately regardless of runtime load.
522            // The raw handle is valid as long as `stdin` lives — we move
523            // `stdin` into the thread via a guard to keep it alive.
524            std::thread::spawn(move || {
525                use std::io::Write;
526                let _keep_alive = stdin; // prevent Tokio ChildStdin drop
527                let mut pipe = raw_handle;
528                let payload = format!("{}\n", message);
529                if let Err(e) = pipe.write_all(payload.as_bytes()) {
530                    tracing::warn!(block_id = %block_id_stdin, "subprocess stdin write error: {}", e);
531                    std::mem::forget(pipe); // don't close the handle — _keep_alive owns it
532                    return;
533                }
534                if let Err(e) = pipe.flush() {
535                    tracing::warn!(block_id = %block_id_stdin, "subprocess stdin flush error: {}", e);
536                }
537                std::mem::forget(pipe); // don't double-close — _keep_alive owns the handle
538                // _keep_alive (Tokio ChildStdin) drops here → EOF to the subprocess
539            });
540        }
541
542        // Spawn stdout_reader task
543        let block_id_read = self.block_id.clone();
544        let broker_read = self.broker.clone();
545        let inner_read = Arc::clone(&self.inner);
546        let wstore_read = self.wstore.clone();
547        let event_bus_read = self.event_bus.clone();
548        let filestore_read = self.filestore.clone();
549        let health_read = Arc::clone(&self.health_monitor);
550        let session_id_field = config.session_id_field.clone();
551        tokio::spawn(async move {
552            let reader = BufReader::new(stdout);
553            let mut lines = reader.lines();
554            let mut stats = super::session_stats::SessionStatsAccumulator::new(block_id_read.clone());
555
556            tracing::info!(block_id = %block_id_read, "stdout_reader started");
557
558            loop {
559                match lines.next_line().await {
560                    Err(e) => {
561                        tracing::warn!(block_id = %block_id_read, error = %e, "subprocess stdout read error");
562                        break;
563                    }
564                    Ok(None) => {
565                        tracing::info!(block_id = %block_id_read, "subprocess stdout EOF");
566                        break;
567                    }
568                    Ok(Some(line)) => {
569                        let trimmed = line.trim();
570                        if trimmed.is_empty() {
571                            continue;
572                        }
573
574                        // Track session metadata (debounced 1 s).
575                        // Use `line.len()` (not `trimmed.len()`) to match persistent.rs
576                        // so token_estimate stays consistent across controller types.
577                        stats.record_line(line.len(), &wstore_read);
578
579                        // Classify output for health monitoring
580                        if let Ok(parsed) = serde_json::from_str::<serde_json::Value>(trimmed) {
581                            let (meaningful, error) = classify_output_line(&parsed);
582                            health_read.record_output(meaningful);
583                            if let Some((class, msg)) = error {
584                                health_read.record_error(class, msg);
585                            }
586                        }
587
588                        // Try to capture session/thread ID from the provider's init event.
589                        // Claude: {"type":"system","subtype":"init","session_id":"..."}
590                        // Gemini: {"type":"init","session_id":"..."}
591                        // Codex:  {"type":"thread.started","thread_id":"..."}
592                        if let Ok(parsed) = serde_json::from_str::<serde_json::Value>(trimmed) {
593                            if let Some(sid) = parsed.get(&session_id_field).and_then(|v| v.as_str()) {
594                                let sid_string = sid.to_string();
595                                // Authoritative CLI capture —
596                                // overwrites any prior value
597                                // (including stale hydrated ids
598                                // from picker reattach). De-dups
599                                // when the same id repeats across
600                                // turns. See
601                                // `record_captured_session_id_inner`
602                                // for the unit-tested form.
603                                let changed = SubprocessController::record_captured_session_id_inner(
604                                    &inner_read,
605                                    &sid_string,
606                                );
607                                if changed {
608                                    tracing::info!(
609                                        block_id = %block_id_read,
610                                        field = %session_id_field,
611                                        session_id = %sid_string,
612                                        "captured session id"
613                                    );
614
615                                    // Persist session_id to block metadata
616                                    if let Some(ref store) = wstore_read {
617                                        let oref_str = format!("block:{}", block_id_read);
618                                        let mut meta_update =
619                                            crate::backend::obj::MetaMapType::new();
620                                        meta_update.insert(
621                                            "agent:sessionid".to_string(),
622                                            serde_json::Value::String(sid_string),
623                                        );
624                                        if let Err(e) = crate::server::service::update_object_meta(
625                                            store, &oref_str, &meta_update,
626                                        ) {
627                                            tracing::warn!(
628                                                block_id = %block_id_read,
629                                                error = %e,
630                                                "failed to persist agent:sessionid"
631                                            );
632                                        } else if let Some(ref event_bus) = event_bus_read {
633                                            // Broadcast metadata update to frontend
634                                            if let Ok(updated_block) = store.must_get::<crate::backend::obj::Block>(&block_id_read) {
635                                                let update_data = serde_json::to_value(
636                                                    &crate::backend::obj::WaveObjUpdate {
637                                                        updatetype: "update".into(),
638                                                        otype: "block".into(),
639                                                        oid: block_id_read.clone(),
640                                                        obj: Some(crate::backend::obj::wave_obj_to_value(&updated_block)),
641                                                    },
642                                                )
643                                                .ok();
644                                                event_bus.broadcast_event(
645                                                    &crate::backend::eventbus::WSEventType {
646                                                        eventtype: "waveobj:update".to_string(),
647                                                        oref: oref_str,
648                                                        data: update_data,
649                                                    },
650                                                );
651                                            }
652                                        }
653                                    }
654                                }
655                            }
656                        }
657
658                        // Publish the NDJSON line as a WPS blockfile event on the "output" subject
659                        // and write-through to FileStore for persistent history (Phase 1.3).
660                        if let Some(ref broker) = broker_read {
661                            tracing::info!(block_id = %block_id_read, line = %trimmed, "subprocess stdout → blockfile");
662                            // Include the newline so the frontend line splitter works correctly
663                            let line_with_newline = format!("{}\n", trimmed);
664                            super::shell::handle_append_block_file(
665                                broker,
666                                &block_id_read,
667                                SUBPROCESS_OUTPUT_SUBJECT,
668                                line_with_newline.as_bytes(),
669                                filestore_read.as_ref(),
670                            );
671                        }
672                    }
673                }
674            }
675
676            tracing::info!(block_id = %block_id_read, "stdout_reader exiting");
677        });
678
679        // Spawn stderr reader (log warnings, don't publish)
680        let block_id_err = self.block_id.clone();
681        tokio::spawn(async move {
682            let reader = BufReader::new(stderr);
683            let mut lines = reader.lines();
684            loop {
685                match lines.next_line().await {
686                    Err(e) => {
687                        tracing::warn!(block_id = %block_id_err, error = %e, "subprocess stderr read error");
688                        break;
689                    }
690                    Ok(None) => break,
691                    Ok(Some(line)) => {
692                        if !line.trim().is_empty() {
693                            tracing::info!(
694                                block_id = %block_id_err,
695                                stderr = %line,
696                                "subprocess stderr"
697                            );
698                        }
699                    }
700                }
701            }
702        });
703
704        // Spawn health watchdog (checks every 5s while turn is active)
705        let health_watchdog = Arc::clone(&self.health_monitor);
706        tokio::spawn(async move {
707            let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(5));
708            loop {
709                interval.tick().await;
710                if !health_watchdog.is_active_turn() {
711                    break;
712                }
713                health_watchdog.check();
714            }
715        });
716
717        // Spawn process_waiter task
718        let inner_wait = Arc::clone(&self.inner);
719        let block_id_wait = self.block_id.clone();
720        let broker_wait = self.broker.clone();
721        let run_lock = Arc::clone(&self.run_lock);
722        let health_wait = Arc::clone(&self.health_monitor);
723        let self_ref_wait = self.self_ref.lock().unwrap().clone().unwrap_or_default();
724        tokio::spawn(async move {
725            // Wait for either process exit or kill signal
726            tokio::select! {
727                exit_result = child.wait() => {
728                    let exit_code = match exit_result {
729                        Ok(status) => status.code().unwrap_or(-1),
730                        Err(e) => {
731                            tracing::warn!(
732                                block_id = %block_id_wait,
733                                error = %e,
734                                "subprocess wait error"
735                            );
736                            -1
737                        }
738                    };
739
740                    tracing::info!(
741                        block_id = %block_id_wait,
742                        exit_code = exit_code,
743                        "subprocess exited"
744                    );
745
746                    // Update inner state
747                    {
748                        let mut inner = inner_wait.lock().unwrap();
749                        inner.proc_exit_code = exit_code;
750                        SubprocessController::set_status(&mut inner, STATUS_DONE);
751                        inner.current_pid = None;
752                        inner.kill_tx = None;
753                    }
754                }
755                force = kill_rx => {
756                    let force = force.unwrap_or(false);
757                    tracing::info!(
758                        block_id = %block_id_wait,
759                        force = force,
760                        "subprocess kill requested"
761                    );
762
763                    if force {
764                        let _ = child.kill().await;
765                    } else {
766                        // On Unix, send SIGTERM. On Windows, kill() is the only option.
767                        #[cfg(unix)]
768                        {
769                            if let Some(pid) = child.id() {
770                                unsafe { libc::kill(pid as i32, libc::SIGTERM); }
771                            }
772                            // Give it a moment to exit gracefully
773                            tokio::time::sleep(tokio::time::Duration::from_millis(
774                                super::DEFAULT_GRACEFUL_KILL_WAIT_MS,
775                            )).await;
776                            let _ = child.kill().await;
777                        }
778                        #[cfg(not(unix))]
779                        {
780                            let _ = child.kill().await;
781                        }
782                    }
783
784                    let _ = child.wait().await;
785
786                    {
787                        let mut inner = inner_wait.lock().unwrap();
788                        inner.proc_exit_code = -1;
789                        SubprocessController::set_status(&mut inner, STATUS_DONE);
790                        inner.current_pid = None;
791                        inner.kill_tx = None;
792                    }
793                }
794            }
795
796            // Update health monitor with exit status
797            {
798                let inner = inner_wait.lock().unwrap();
799                health_wait.set_exited(inner.proc_exit_code);
800            }
801
802            // Publish done status
803            if let Some(ref broker) = broker_wait {
804                let status = {
805                    let inner = inner_wait.lock().unwrap();
806                    BlockControllerRuntimeStatus {
807                        blockid: block_id_wait.clone(),
808                        version: inner.status_version,
809                        shellprocstatus: inner.proc_status.clone(),
810                        shellprocconnname: "local".to_string(),
811                        shellprocexitcode: inner.proc_exit_code,
812                        spawn_ts_ms: None,
813                        is_agent_pane: false,
814                    }
815                };
816                super::publish_controller_status(broker, &status);
817            }
818
819            // Release run lock
820            run_lock.store(false, Ordering::SeqCst);
821
822            // Drain message queue: if messages were queued while this turn
823            // was running, pop the next one and spawn it via the weak
824            // self-reference.
825            let next_config = {
826                let mut inner = inner_wait.lock().unwrap();
827                inner.pending_messages.pop_front()
828            };
829            if let Some(config) = next_config {
830                if let Some(ctrl) = self_ref_wait.upgrade() {
831                    tracing::info!(
832                        block_id = %block_id_wait,
833                        "draining queued message"
834                    );
835                    if let Err(e) = ctrl.spawn_turn(config) {
836                        tracing::warn!(
837                            block_id = %block_id_wait,
838                            error = %e,
839                            "failed to spawn queued turn"
840                        );
841                    }
842                }
843            }
844        });
845
846        Ok(())
847    }
848
849    /// Stop the currently running subprocess.
850    pub fn stop_subprocess(&self, force: bool) -> Result<(), String> {
851        let kill_tx = {
852            let mut inner = self.inner.lock().unwrap();
853            inner.kill_tx.take()
854        };
855        match kill_tx {
856            Some(tx) => {
857                let _ = tx.send(force);
858                Ok(())
859            }
860            None => Ok(()), // No running process
861        }
862    }
863}
864
865impl Controller for SubprocessController {
866    fn start(
867        &self,
868        _block_meta: super::super::obj::MetaMapType,
869        _rt_opts: Option<serde_json::Value>,
870        _force: bool,
871    ) -> Result<(), String> {
872        // SubprocessController doesn't auto-start on resync.
873        // Turns are initiated by SubprocessSpawnCommand / AgentInputCommand.
874        tracing::info!(
875            block_id = %self.block_id,
876            "subprocess controller registered (no auto-start)"
877        );
878        Ok(())
879    }
880
881    fn stop(&self, _graceful: bool, new_status: &str) -> Result<(), String> {
882        // Stop any running subprocess
883        self.stop_subprocess(true)?;
884
885        let mut inner = self.inner.lock().unwrap();
886        if inner.proc_status != new_status {
887            Self::set_status(&mut inner, new_status);
888        }
889
890        Ok(())
891    }
892
893    fn get_runtime_status(&self) -> BlockControllerRuntimeStatus {
894        self.get_status_snapshot()
895    }
896
897    fn send_input(&self, input: BlockInputUnion, _seq: Option<u64>) -> Result<(), String> {
898        // SubprocessController doesn't accept raw PTY input — user messages
899        // go through spawn_turn() (via AgentInputCommand RPC).
900        //
901        // Signals ARE accepted though: the agent-pane composer's Esc
902        // handler sends SIGINT via `ControllerInputCommand({signame:"SIGINT"})`
903        // when the user wants to cancel an in-flight turn. Route that to
904        // `stop_subprocess(force=true)` so the current subprocess is
905        // killed via `kill_tx`. Without this, Esc was silently rejected
906        // and the agent kept running.
907        if let Some(sig) = input.sig_name.as_deref() {
908            if sig == "SIGINT" || sig == "SIGTERM" {
909                tracing::info!(
910                    block_id = %self.block_id,
911                    sig = %sig,
912                    "subprocess controller: received signal, killing current turn"
913                );
914                return self.stop_subprocess(true);
915            }
916            return Err(format!(
917                "subprocess controller: unsupported signal {sig} (only SIGINT/SIGTERM)"
918            ));
919        }
920        if input.input_data.is_some() {
921            return Err("subprocess controller does not accept raw input; use AgentInputCommand".to_string());
922        }
923        // Term resize / other input types: accepted-no-op.
924        Ok(())
925    }
926
927    fn controller_type(&self) -> &str {
928        BLOCK_CONTROLLER_SUBPROCESS
929    }
930
931    fn block_id(&self) -> &str {
932        &self.block_id
933    }
934
935    fn as_any(&self) -> &dyn std::any::Any {
936        self
937    }
938}
939
940#[cfg(test)]
941mod tests {
942    use super::*;
943
944    #[test]
945    fn test_subprocess_controller_new() {
946        let ctrl = SubprocessController::new(
947            "tab-1".to_string(),
948            "block-1".to_string(),
949            None,
950            None,
951            None,
952            None,
953        );
954        assert_eq!(ctrl.controller_type(), BLOCK_CONTROLLER_SUBPROCESS);
955        assert_eq!(ctrl.block_id(), "block-1");
956
957        let status = ctrl.get_runtime_status();
958        assert_eq!(status.shellprocstatus, STATUS_INIT);
959        assert_eq!(status.blockid, "block-1");
960    }
961
962    #[test]
963    fn test_subprocess_controller_rejects_raw_input() {
964        let ctrl = SubprocessController::new(
965            "tab-1".to_string(),
966            "block-1".to_string(),
967            None,
968            None,
969            None,
970            None,
971        );
972        let result = ctrl.send_input(BlockInputUnion::data(b"hello".to_vec()), None);
973        assert!(result.is_err());
974        assert!(result.unwrap_err().contains("AgentInputCommand"));
975    }
976
977    #[test]
978    fn test_subprocess_controller_start_is_noop() {
979        let ctrl = SubprocessController::new(
980            "tab-1".to_string(),
981            "block-1".to_string(),
982            None,
983            None,
984            None,
985            None,
986        );
987        let result = ctrl.start(HashMap::new(), None, false);
988        assert!(result.is_ok());
989
990        // Still in init state — no auto-start
991        let status = ctrl.get_runtime_status();
992        assert_eq!(status.shellprocstatus, STATUS_INIT);
993    }
994
995    #[test]
996    fn test_subprocess_controller_stop_when_idle() {
997        let ctrl = SubprocessController::new(
998            "tab-1".to_string(),
999            "block-1".to_string(),
1000            None,
1001            None,
1002            None,
1003            None,
1004        );
1005        let result = ctrl.stop(true, STATUS_DONE);
1006        assert!(result.is_ok());
1007
1008        let status = ctrl.get_runtime_status();
1009        assert_eq!(status.shellprocstatus, STATUS_DONE);
1010    }
1011
1012    #[test]
1013    fn test_subprocess_controller_session_id_initially_none() {
1014        let ctrl = SubprocessController::new(
1015            "tab-1".to_string(),
1016            "block-1".to_string(),
1017            None,
1018            None,
1019            None,
1020            None,
1021        );
1022        assert!(ctrl.session_id().is_none());
1023    }
1024
1025    #[test]
1026    fn test_subprocess_controller_concurrent_spawn_blocked() {
1027        let ctrl = SubprocessController::new(
1028            "tab-1".to_string(),
1029            "block-1".to_string(),
1030            None,
1031            None,
1032            None,
1033            None,
1034        );
1035
1036        // Manually acquire run lock
1037        ctrl.run_lock.store(true, Ordering::SeqCst);
1038
1039        let config = SubprocessSpawnConfig {
1040            cli_command: "echo".to_string(),
1041            cli_args: vec![],
1042            working_dir: String::new(),
1043            env_vars: HashMap::new(),
1044            message: "test".to_string(),
1045            resume_flag: String::new(),
1046            session_id_field: "session_id".to_string(),
1047            message_id: None,
1048            session_id: None,
1049        };
1050
1051        let result = ctrl.spawn_turn(config);
1052        // spawn_turn now queues instead of rejecting when busy
1053        assert!(result.is_ok());
1054
1055        // Verify the message was queued
1056        let inner = ctrl.inner.lock().unwrap();
1057        assert_eq!(inner.pending_messages.len(), 1);
1058        assert_eq!(inner.pending_messages[0].message, "test");
1059        drop(inner);
1060
1061        // Release lock
1062        ctrl.run_lock.store(false, Ordering::SeqCst);
1063    }
1064
1065    #[test]
1066    fn hydrate_session_id_populates_inner_when_none() {
1067        // Regression test for the 2026-05-24 "clicking My Agents
1068        // re-inserts the startup context" report. A fresh
1069        // SubprocessController is created for the reattached block;
1070        // its inner.session_id starts as None. The picker reattach
1071        // flow persists the prior block's session id into
1072        // `agent:sessionid` meta, the caller plumbs it into
1073        // `SubprocessSpawnConfig::session_id`, and spawn_turn calls
1074        // `hydrate_session_id_from_config` before building args.
1075        // After hydration, the existing args-builder appends
1076        // `--resume <sid>` on this very first turn — no
1077        // re-injected startup context.
1078        let ctrl = SubprocessController::new(
1079            "tab-1".to_string(),
1080            "block-reattach".to_string(),
1081            None,
1082            None,
1083            None,
1084            None,
1085        );
1086        assert!(ctrl.inner.lock().unwrap().session_id.is_none());
1087
1088        ctrl.hydrate_session_id_from_config(Some("prior-sid-from-meta"));
1089        assert_eq!(
1090            ctrl.inner.lock().unwrap().session_id.as_deref(),
1091            Some("prior-sid-from-meta")
1092        );
1093    }
1094
1095    #[test]
1096    fn hydrate_session_id_is_noop_when_value_already_present() {
1097        // Hydration is best-effort, not authoritative — it only
1098        // sets `inner.session_id` when None. The reason isn't
1099        // captured-id-wins (that's enforced at CAPTURE time below);
1100        // it's just to avoid re-hydrating on every spawn_turn call
1101        // within a controller lifetime. A stale value here is fine
1102        // because the next CLI emit at `record_captured_session_id_inner`
1103        // will overwrite.
1104        let ctrl = SubprocessController::new(
1105            "tab-1".to_string(),
1106            "block-resume".to_string(),
1107            None,
1108            None,
1109            None,
1110            None,
1111        );
1112        ctrl.inner.lock().unwrap().session_id = Some("captured-sid".to_string());
1113
1114        ctrl.hydrate_session_id_from_config(Some("different-config-sid"));
1115        assert_eq!(
1116            ctrl.inner.lock().unwrap().session_id.as_deref(),
1117            Some("captured-sid"),
1118            "hydration must not overwrite an existing value"
1119        );
1120    }
1121
1122    #[test]
1123    fn record_captured_overwrites_hydrated_value() {
1124        // The CLI is authoritative for session id once it speaks.
1125        // Codex P1 on PR #1018 first cut: my original
1126        // `if !already_captured` guard in the stdout reader meant
1127        // that a hydrated (possibly stale) session id would lock
1128        // out every subsequent CLI-emitted value, so a wrong
1129        // `--resume <stale>` would be passed forever. The fix
1130        // (`record_captured_session_id_inner`) always overwrites
1131        // and returns whether the value changed.
1132        let ctrl = SubprocessController::new(
1133            "tab-1".to_string(),
1134            "block-overwrite".to_string(),
1135            None,
1136            None,
1137            None,
1138            None,
1139        );
1140        ctrl.hydrate_session_id_from_config(Some("stale-hydrated-sid"));
1141        assert_eq!(
1142            ctrl.session_id().as_deref(),
1143            Some("stale-hydrated-sid")
1144        );
1145
1146        let changed = ctrl.record_captured_session_id("authoritative-sid");
1147        assert!(changed, "value differs from hydrated; must report changed");
1148        assert_eq!(
1149            ctrl.session_id().as_deref(),
1150            Some("authoritative-sid"),
1151            "CLI-emitted id must overwrite hydrated value"
1152        );
1153    }
1154
1155    #[test]
1156    fn record_captured_dedups_same_value() {
1157        // Real CLI streams emit `session_id` on every NDJSON frame,
1158        // not just the first. The dedup is a perf knob (skips the
1159        // meta-update broadcast on repeats), not a correctness
1160        // gate — captured-id is still authoritative on first emit.
1161        let ctrl = SubprocessController::new(
1162            "tab-1".to_string(),
1163            "block-dedup".to_string(),
1164            None,
1165            None,
1166            None,
1167            None,
1168        );
1169        assert!(ctrl.record_captured_session_id("sid-1"));
1170        assert!(!ctrl.record_captured_session_id("sid-1"),
1171            "second call with same value must return false (no broadcast)");
1172        assert_eq!(ctrl.session_id().as_deref(), Some("sid-1"));
1173    }
1174
1175    #[test]
1176    fn record_captured_ignores_empty() {
1177        // Defensive: empty string from a malformed CLI emit must
1178        // not clear a valid prior value.
1179        let ctrl = SubprocessController::new(
1180            "tab-1".to_string(),
1181            "block-empty".to_string(),
1182            None,
1183            None,
1184            None,
1185            None,
1186        );
1187        ctrl.record_captured_session_id("real-sid");
1188        assert!(!ctrl.record_captured_session_id(""),
1189            "empty CLI emit must be ignored");
1190        assert_eq!(ctrl.session_id().as_deref(), Some("real-sid"));
1191    }
1192
1193    #[test]
1194    fn hydrate_session_id_ignores_empty_and_none() {
1195        // Greenfield launches pass `None` (or `Some("")` if the
1196        // caller didn't filter) — hydration must be a no-op in
1197        // either case so inner.session_id stays None until the CLI
1198        // captures its own.
1199        let ctrl = SubprocessController::new(
1200            "tab-1".to_string(),
1201            "block-greenfield".to_string(),
1202            None,
1203            None,
1204            None,
1205            None,
1206        );
1207        ctrl.hydrate_session_id_from_config(None);
1208        assert!(ctrl.inner.lock().unwrap().session_id.is_none());
1209
1210        ctrl.hydrate_session_id_from_config(Some(""));
1211        assert!(ctrl.inner.lock().unwrap().session_id.is_none());
1212    }
1213
1214    #[test]
1215    fn spawn_turn_preserves_session_id_in_queued_config() {
1216        // When the controller is busy, spawn_turn queues the config
1217        // for the drain-from-queue path. The hydration ONLY runs on
1218        // the direct-spawn path (after try_lock_run), so the queued
1219        // config must carry session_id through unchanged for the
1220        // drain path's recursive call to see it.
1221        let ctrl = SubprocessController::new(
1222            "tab-1".to_string(),
1223            "block-queued".to_string(),
1224            None,
1225            None,
1226            None,
1227            None,
1228        );
1229        ctrl.run_lock.store(true, Ordering::SeqCst);
1230
1231        let config = SubprocessSpawnConfig {
1232            cli_command: "claude".to_string(),
1233            cli_args: vec!["-p".to_string()],
1234            working_dir: String::new(),
1235            env_vars: HashMap::new(),
1236            message: "hi".to_string(),
1237            resume_flag: "--resume".to_string(),
1238            session_id_field: "session_id".to_string(),
1239            message_id: None,
1240            session_id: Some("prior-sid".to_string()),
1241        };
1242        let _ = ctrl.spawn_turn(config);
1243
1244        let inner = ctrl.inner.lock().unwrap();
1245        assert_eq!(inner.pending_messages.len(), 1);
1246        assert_eq!(
1247            inner.pending_messages[0].session_id.as_deref(),
1248            Some("prior-sid"),
1249        );
1250        // Hydration didn't run yet — direct-spawn path was bypassed
1251        // by the busy lock; the drain will hydrate when it dequeues.
1252        assert!(inner.session_id.is_none());
1253    }
1254}